package com.shujia.flink.core

// 一定要导入这个包
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


object Demo3DataFlow {
  def main(args: Array[String]): Unit = {

    //获取flink环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度
    env.getConfig.setParallelism(2)

    //连接socket创建DataStream
    //nc -lk 8888
    val ds: DataStream[String] = env.socketTextStream("node1", 8888)

    val kvDS = ds.map(word => (word, 1)).setParallelism(2)

    val countDS = kvDS.keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)
      .setParallelism(2)

    countDS.print().setParallelism(1)

    env.execute()
  }
}
